/*
 * @Author: lokei
 * @Date: 2022-09-21 18:43:12
 * @LastEditors: lokei
 * @LastEditTime: 2022-10-27 08:41:57
 * @Description: 
 */
package cn.lokei.task.handler;

import java.util.List;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import cn.lokei.task.entity.printer.Printer;
import cn.lokei.task.entity.redis.RedisStream;
import cn.lokei.task.handler.iot.pc.PCHandler;
import cn.lokei.task.service.print.PrintService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class RedisStreamListenerMessage implements StreamListener<String, ObjectRecord<String, String>> {

    /**
     * redis Stream 工具类
     */
    @Autowired
    private RedisStream redisStream;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    PrintService printService;

    @Autowired
    PCHandler pcHandler;

    /**
     * 消息监听
     *
     * @param message e
     */
    @SneakyThrows
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        log.info("接受redisStream: {},监听到消息：{}", message.getStream(), message);
        String message_topic = message.getStream();
        if (message_topic != null) {
            StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
            switch (message_topic) {
                case "smt_print":
                    List<MapRecord<String, String, String>> result = streamOperations.range("smt_print",
                            Range.closed(message.getId().toString(), message.getId().toString()));
                    if (result != null && result.size() > 0) {
                        // 开始和结束都是同一个ID，所以结果只有一条
                        MapRecord<String, String, String> record = result.get(0);
                        String printer_str = record.getValue().get("printer");
                        JSONObject jsonObject = JSON.parseObject(printer_str);
                        Printer printer = new Printer();
                        printer.setType(jsonObject.getString("type"));
                        printer.setUser(jsonObject.getString("user"));
                        printer.setUkey(jsonObject.getString("ukey"));
                        printer.setSn(jsonObject.getString("sn"));
                        printService.print(printer, record.getValue().get("content"));

                        // 消费完成后确认消费（ACK）
                        redisStream.ack(String.valueOf(message.getStream()), "smt_print_grp",
                                String.valueOf(message.getId()));

                        streamOperations.delete(record);
                    }
                case "iot_mqtt_ops":
                    List<MapRecord<String, String, String>> result_iot = streamOperations.range("iot_mqtt_ops",
                            Range.closed(message.getId().toString(), message.getId().toString()));
                    if (result_iot != null && result_iot.size() > 0) {
                        // 开始和结束都是同一个ID，所以结果只有一条
                        MapRecord<String, String, String> record = result_iot.get(0);
                        pcHandler.handler(record);

                        // 消费完成后确认消费（ACK）
                        redisStream.ack(String.valueOf(message.getStream()), "iot_mqtt_ops_grp",
                                String.valueOf(message.getId()));

                        streamOperations.delete(record);
                    }
                default:
                    break;
            }
        }
    }

}
